package com.shujia.kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

object Demo01KafkaSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    // 指定Kafka Broker集群的地址
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
    // 指定消费者组ID
    properties.setProperty("group.id", "test")

    val flinkKafkaConsumer: FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)

    flinkKafkaConsumer.setStartFromEarliest() // 从头开始消费
    //    flinkKafkaConsumer.setStartFromLatest()        // 从最新的数据开始消费，如果当前组是第一次消费 也会从头开始消费数据
    //    flinkKafkaConsumer.setStartFromTimestamp(...)  // 从某个时间点开始消费
    //    flinkKafkaConsumer.setStartFromGroupOffsets()  // 默认的 使用组的偏移量进行消费

    // 将Kafka的Consumer注册为Source -- 无界流
    val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer)

    // 基于Kafka数据统计单词数量
    kafkaDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()
  }

}
